// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package physicalop

import (
	"bytes"
	"fmt"

	"github.com/pingcap/tidb/pkg/expression"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/planner/core/base"
	"github.com/pingcap/tidb/pkg/planner/property"
	"github.com/pingcap/tidb/pkg/planner/util"
	"github.com/pingcap/tidb/pkg/planner/util/costusage"
	"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
	"github.com/pingcap/tidb/pkg/planner/util/utilfuncp"
	"github.com/pingcap/tidb/pkg/util/plancodec"
	"github.com/pingcap/tidb/pkg/util/size"
	"github.com/pingcap/tipb/go-tipb"
)

// PhysicalSort is the physical operator of sort, which implements a memory sort.
type PhysicalSort struct {
	BasePhysicalPlan

	ByItems []*util.ByItems
	// whether this operator only need to sort the data of one partition.
	// it is true only if it is used to sort the sharded data of the window function.
	IsPartialSort bool
}

// Init initializes PhysicalSort.
func (p PhysicalSort) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalSort {
	p.BasePhysicalPlan = NewBasePhysicalPlan(ctx, plancodec.TypeSort, &p, offset)
	p.SetChildrenReqProps(props)
	p.SetStats(stats)
	return &p
}

// Clone implements op.PhysicalPlan interface.
func (p *PhysicalSort) Clone(newCtx base.PlanContext) (base.PhysicalPlan, error) {
	cloned := new(PhysicalSort)
	cloned.SetSCtx(newCtx)
	cloned.IsPartialSort = p.IsPartialSort
	base, err := p.BasePhysicalPlan.CloneWithSelf(newCtx, cloned)
	if err != nil {
		return nil, err
	}
	cloned.BasePhysicalPlan = *base
	for _, it := range p.ByItems {
		cloned.ByItems = append(cloned.ByItems, it.Clone())
	}
	return cloned, nil
}

// ExtractCorrelatedCols implements op.PhysicalPlan interface.
func (p *PhysicalSort) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
	corCols := make([]*expression.CorrelatedColumn, 0, len(p.ByItems))
	for _, item := range p.ByItems {
		corCols = append(corCols, expression.ExtractCorColumns(item.Expr)...)
	}
	return corCols
}

// MemoryUsage return the memory usage of PhysicalSort
func (p *PhysicalSort) MemoryUsage() (sum int64) {
	if p == nil {
		return
	}

	sum = p.BasePhysicalPlan.MemoryUsage() + size.SizeOfSlice + int64(cap(p.ByItems))*size.SizeOfPointer +
		size.SizeOfBool
	for _, byItem := range p.ByItems {
		sum += byItem.MemoryUsage()
	}
	return
}

// ExplainInfo implements Plan interface.
func (p *PhysicalSort) ExplainInfo() string {
	buffer := bytes.NewBufferString("")
	buffer = util.ExplainByItems(p.SCtx().GetExprCtx().GetEvalCtx(), buffer, p.ByItems)
	if p.TiFlashFineGrainedShuffleStreamCount > 0 {
		fmt.Fprintf(buffer, ", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)
	}
	return buffer.String()
}

// Attach2Task implements PhysicalPlan interface.
func (p *PhysicalSort) Attach2Task(tasks ...base.Task) base.Task {
	return utilfuncp.Attach2Task4PhysicalSort(p, tasks...)
}

// GetCost calculates the cost of the PhysicalSort operation based on the input count and schema.
func (p *PhysicalSort) GetCost(count float64, schema *expression.Schema) float64 {
	return utilfuncp.GetCost4PhysicalSort(p, count, schema)
}

// GetPlanCostVer1 calculates the cost of the PhysicalSort plan for a given task type and options.
func (p *PhysicalSort) GetPlanCostVer1(taskType property.TaskType, option *optimizetrace.PlanCostOption) (float64, error) {
	return utilfuncp.GetPlanCostVer14PhysicalSort(p, taskType, option)
}

// GetPlanCostVer2 calculates the cost of the PhysicalSort plan for a given task type and options, returning a CostVer2 object.
func (p *PhysicalSort) GetPlanCostVer2(taskType property.TaskType, option *optimizetrace.PlanCostOption, isChildOfINL ...bool) (costusage.CostVer2, error) {
	return utilfuncp.GetPlanCostVer24PhysicalSort(p, taskType, option, isChildOfINL...)
}

// ToPB converts the PhysicalSort operator to a Protocol Buffer representation.
func (p *PhysicalSort) ToPB(ctx *base.BuildPBContext, storeType kv.StoreType) (*tipb.Executor, error) {
	return utilfuncp.ToPB4PhysicalSort(p, ctx, storeType)
}

// ResolveIndices implements Plan interface.
func (p *PhysicalSort) ResolveIndices() (err error) {
	return utilfuncp.ResolveIndicesForSort(&p.BasePhysicalPlan)
}

// CloneForPlanCache implements the base.Plan interface.
// todo: after all physical are moved, this function can be generated by codegen.
func (p *PhysicalSort) CloneForPlanCache(newCtx base.PlanContext) (base.Plan, bool) {
	cloned := new(PhysicalSort)
	*cloned = *p
	basePlan, baseOK := p.BasePhysicalPlan.CloneForPlanCacheWithSelf(newCtx, cloned)
	if !baseOK {
		return nil, false
	}
	cloned.BasePhysicalPlan = *basePlan
	cloned.ByItems = util.CloneByItemss(p.ByItems)
	return cloned, true
}
